/******************************************************************************

VERSION $OpenBSD: mpool.libtp,v 1.3 1999/02/15 05:11:25 millert Exp $
PACKAGE: 	User Level Shared Memory Manager

DESCRIPTION: 	
	This package provides a buffer pool interface implemented as
	a collection of file pages mapped into shared memory.

	Based on Mark's buffer manager

ROUTINES: 
    External
	buf_alloc
	buf_flags
	buf_get
	buf_init
	buf_last
	buf_open
	buf_pin
	buf_sync
	buf_unpin
    Internal
	bf_assign_buf
	bf_fid_to_fd
	bf_newbuf
	bf_put_page
	

******************************************************************************/
#include	<sys/types.h>
#include	<assert.h>
#include	<sys/file.h>
#include	<sys/stat.h>
#include	<stdio.h>
#include	<errno.h>
#include	"list.h"
#include	"user.h"
#include	"txn_sys.h"
#include	"buf.h"
#include	"semkeys.h"
#include	"error.h"

/*
    we need to translate between some type of file id that the user 
    process passes and a file descriptor.  For now, it's a nop.
*/
#define GET_MASTER      get_sem ( buf_spinlock )
#define RELEASE_MASTER  release_sem ( buf_spinlock )

#define	LRUID	*buf_lru
#define	LRUP	(bufhdr_table+*buf_lru)
#define	MRU	bufhdr_table[*buf_lru].lru.prev

/* Global indicator that you have started reusing buffers */
int	do_statistics = 0;
/*
    Process Statics (pointers into shared memory)
*/
static	BUF_T	*buf_table = 0;
static	BUFHDR_T	*bufhdr_table;
static	int	*buf_hash_table;
static	int	*buf_lru;		/* LRU is the free list */
static	int	buf_spinlock;
static	FINFO_T	*buf_fids;
static	int	*buf_sp;		/* Pointer to string free space */
static	char	*buf_strings;

/* Process Local FID->FD table */
static	int	fds[NUM_FILE_ENTRIES];

/* Static routines */
static	BUFHDR_T	*bf_assign_buf();
static	int		bf_fid_to_fd();
static	BUFHDR_T	*bf_newbuf();
static	int		bf_put_page();

/*
    Return  0 on success
	    1 on failure
*/
extern int
buf_init ( )
{
    ADDR_T	buf_region;
    BUFHDR_T	*bhp;
    int		i;
    int		ref_count;
    int		*spinlockp;

    /*
	Initialize Process local structures
    */
    for ( i = 0; i < NUM_FILE_ENTRIES; i++ ) {
	fds[i] = -1;
    }

    buf_region = attach_region ( BUF_REGION_NAME, BUF_REGION_NUM,
				 BUF_REGION_SIZE, &ref_count );
    if ( !buf_region ) {
	return (1);
    }
    error_log3 ( "Buf Region: ADDR: %d ID: %d SIZE: %d\n", buf_region,
		    BUF_REGION_NUM, BUF_REGION_SIZE );

    buf_table = (BUF_T *)buf_region;
    bufhdr_table = (BUFHDR_T *)(buf_table + NUM_BUFS);
    buf_hash_table = (int *)(bufhdr_table + NUM_BUFS);
    buf_lru = buf_hash_table + NUMTABLE_ENTRIES;
    spinlockp = buf_lru + 1;
    buf_fids = (FINFO_T *)(spinlockp+1);
    buf_sp = (int *)(buf_fids + NUM_FILE_ENTRIES);
    buf_strings = (char *)(buf_sp + 1);

    /* Create locking spinlock (gets creating holding the lock) */
    buf_spinlock = create_sem ( BUF_SPIN_NAME, BUF_SPIN_NUM, ref_count <= 1 );
    if ( buf_spinlock < 0 )  {
	return(1);
    }
    if ( ref_count <= 1 ) {
	*spinlockp = buf_spinlock;

	/* Now initialize the buffer manager */

	/* 1. Free list */
	*buf_lru = 0;

	/* 2. Buffer headers */
	for ( i = 0, bhp = bufhdr_table; i < NUM_BUFS; bhp++, i++ ) {
		bhp->lru.next = i+1;
		bhp->lru.prev = i-1;
		bhp->flags = 0;				/* All Flags off */
		bhp->refcount = 0;
		bhp->wait_proc = -1;			/* No sleepers */
		LISTPE_INIT ( hash, bhp, i );		/* Hash chains */
	}
	bufhdr_table[0].lru.prev = NUM_BUFS-1;
	bufhdr_table[NUM_BUFS-1].lru.next = 0;

	/* 3. Hash Table */
	for ( i = 0; i < NUMTABLE_ENTRIES; i++ ) {
		buf_hash_table[i] = NUM_BUFS;
	}

	/* 4. File ID Table */
	for ( i = 0; i < NUM_FILE_ENTRIES; i++ ) {
		buf_fids[i].offset = -1;
		buf_fids[i].npages = -1;
		buf_fids[i].refcount = 0;
	}

	/* 5. Free String Pointer */
	*buf_sp = (FILE_NAME_LEN*NUM_FILE_ENTRIES);
	if (RELEASE_MASTER) {
		return(1);
	}
	error_log0 ( "Initialized buffer region\n" );
    } 
    return (0);
}

extern	void
buf_exit()
{
    int	ref;
    int	i;

    /* Flush Buffer Pool on Exit */
    for ( i = 0; i < NUM_FILE_ENTRIES; i++ ) {
	if ( fds[i] != -1 ) {
		close ( fds[i] );
	}
    }
    if ( buf_table ) {
	detach_region ( buf_table, BUF_REGION_NUM, BUF_REGION_SIZE, &ref );
    }
    return;
}

/*
	We need an empty buffer.  Find the LRU unpinned NON-Dirty page.
*/
static BUFHDR_T	*
bf_newbuf()
{
    int		fd;
    int		lruid;
    int		nbytes;
    int		ndx;
    BUFHDR_T	*bhp;

    lruid = LRUID;
    for ( bhp = LRUP; 
	  bhp->flags & (BUF_PINNED|BUF_IO_IN_PROGRESS); 
	  bhp = LISTP_NEXTP (bufhdr_table, lru, bhp ) ) {

	if ( bhp->lru.next == lruid ) {
		/* OUT OF BUFFERS */
		error_log1 ( "All buffers are pinned.  %s\n",
				"Unable to grant buffer request" );
		return(NULL);
	}
    }
    /* BHP can be used */
    if ( bhp->flags & BUF_DIRTY ) {
	do_statistics = 1;
	/* 
	    MIS  Check for log flushed appropriately
	*/
	fd = bf_fid_to_fd(bhp->id.file_id);
	if ( fd == -1 ) {
	    error_log1 ("Invalid fid %d\n", bhp->id.file_id);
	    return(NULL);
	}
	if ( bf_put_page(fd, bhp) < 0 ) {
	    return(NULL);
	}
    }
    /* Update Hash Pointers */
    ndx = BUF_HASH ( bhp->id.file_id, bhp->id.obj_id );
    LISTP_REMOVE(bufhdr_table, hash, bhp);
    if ( buf_hash_table[ndx] == (bhp-bufhdr_table) ) {
	if ( bhp->hash.next != (bhp-bufhdr_table) ) {
		buf_hash_table[ndx] = bhp->hash.next;
	} else {
		buf_hash_table[ndx] = NUM_BUFS;
	}
    }
    INIT_BUF(bhp); 

    return(bhp);
}
/*
    buf_alloc

    Add a page to a file and return a buffer for it.

*/
ADDR_T
buf_alloc ( fid, new_pageno )
int	fid;
int	*new_pageno;
{
	BUFHDR_T	*bhp;
	int	fd;
	int	len;
	int	ndx;
	OBJ_T	fobj;

	if (GET_MASTER) {
		return(NULL);
	}
	if ( buf_fids[fid].npages == -1 ) {
	    /* initialize npages field */
	    fd = bf_fid_to_fd ( fid );
	}
	assert (fid < NUM_FILE_ENTRIES);

	*new_pageno = buf_fids[fid].npages;
	if ( *new_pageno == -1 ) {
	    RELEASE_MASTER;
	    return ( NULL );
	}
	buf_fids[fid].npages++;
	ndx = BUF_HASH ( fid, *new_pageno );
	fobj.file_id = fid;
	fobj.obj_id  = *new_pageno;
	bhp = bf_assign_buf ( ndx, &fobj, BF_PIN|BF_DIRTY|BF_EMPTY, &len );
	if ( RELEASE_MASTER ) {
		/* Memory leak */
		return(NULL);
	}
	if ( bhp ) {
	    return ((ADDR_T)(buf_table+(bhp-bufhdr_table)));
	} else {
	    return ( NULL );
	}
}


/*
	Buffer Flags
	BF_DIRTY		Mark page as dirty
	BF_EMPTY		Don't initialize page, just get buffer
	BF_PIN			Retrieve with pin 

MIS
Might want to add a flag that sets an LSN for this buffer is the
DIRTY flag is set

Eventually, you may want a flag that indicates the I/O and lock
request should be shipped off together, but not for now.
*/
extern ADDR_T
buf_get ( file_id, page_id, flags, len )
int	file_id;
int	page_id;
u_long	flags;
int	*len;		/* Number of bytes read into buffer */
{
	BUFHDR_T	*bhp;
	int		bufid;
	int		fd;
	int		ndx;
	int		next_bufid;
	int		stat;
	OBJ_T		fobj;	

	ndx = BUF_HASH ( file_id, page_id );
	fobj.file_id = (long) file_id;
	fobj.obj_id = (long) page_id;
	if ( GET_MASTER ) {
		return(NULL);
	}
	/*
		This could be a for loop, but we lose speed
		by making all the cases general purpose so we
		optimize for the no-collision case. 
	*/
	bufid = buf_hash_table[ndx]; 
	if ( bufid < NUM_BUFS ) {
	    for ( bhp = bufhdr_table+bufid; 
		  !OBJ_EQ (bhp->id, fobj) || !(bhp->flags & BUF_VALID);
		  bhp = LISTP_NEXTP ( bufhdr_table, hash, bhp ) ) {

		if ( bhp->hash.next == bufid ) {
		    goto not_found;
		}
	    }
/* found */
	    if ( flags & BF_PIN ) {
		    bhp->flags |= BUF_PINNED;
		    bhp->refcount++;
#ifdef PIN_DEBUG
	fprintf(stderr, "buf_get: %X PINNED (%d)\n", 
			buf_table + (bhp-bufhdr_table), bhp->refcount);
#endif
	    }
	    if ( flags & BF_DIRTY ) {
		    bhp->flags |= BUF_DIRTY;
	    }

	    while ( bhp->flags & BUF_IO_IN_PROGRESS ) {
		/* MIS -- eventually err check here */
#ifdef DEBUG
		printf("About to sleep on %d (me: %d\n)\n", bhp->wait_proc,
			my_txnp - txn_table);
#endif
#ifdef WAIT_STATS
		buf_waits++;
#endif
		stat = proc_sleep_on ( &(bhp->wait_proc), buf_spinlock );
		if ( stat ) {
			/* Memory leak */
			return(NULL);
		}
		if (!( bhp->flags & BUF_IO_IN_PROGRESS) &&
		    (!OBJ_EQ (bhp->id, fobj) || !(bhp->flags & BUF_VALID))) {
			if (RELEASE_MASTER)
				return(NULL);
			return(buf_get ( file_id, page_id, flags, len ));
		}
	    }
	    MAKE_MRU( bhp );
	    *len = BUFSIZE;
	} else {
not_found:
	    /* If you get here, the page isn't in the hash table */
	    bhp = bf_assign_buf ( ndx, &fobj, flags, len );
	}
	/* Common code between found and not found */

	if ( bhp && bhp->flags & BUF_NEWPAGE ) {
	    *len = 0;
	}
	if (RELEASE_MASTER){
		/* Memory leak */
		return(NULL);
	}
	if ( bhp ) {
	    return ((ADDR_T)(buf_table+(bhp-bufhdr_table)));
	} else {
	    return ( NULL );
	}
}

/*
	MIS - do I want to add file links to buffer pool?
*/
extern int
buf_sync ( fid, close )
int	fid;
int	close;		/* should we dec refcount and possibly
			   invalidate all the buffers */
{
    int	i;
    int	fd;
    int	invalidate;
    BUFHDR_T	*bhp;

    if ( (fd = bf_fid_to_fd ( fid )) < 0 ) {
	return(1);
    }
    if (GET_MASTER) {
	return(1);
    }
    invalidate = (buf_fids[fid].refcount == 1 && close);
    if ( invalidate )
	for ( bhp = bufhdr_table, i = 0; i < NUM_BUFS; bhp++, i++ ) {
	    if (bhp->id.file_id == fid) {
		if ((bhp->flags & BF_DIRTY) && (bf_put_page( fd, bhp ) < 0)) {
			return(1);
		}
		bhp->id.file_id = -1;
	    }
	}
    if (invalidate || close)
	buf_fids[fid].refcount--;

    if (RELEASE_MASTER) {
	return(1);
    }
    return(0);


}

extern int
buf_flags ( addr, set_flags, unset_flags )
ADDR_T	addr;
u_long	set_flags;
u_long	unset_flags;
{
    int		bufid;
    BUFHDR_T	*bhp;

#ifdef PIN_DEBUG
	fprintf(stderr, "buf_flags: %X setting %s%s%s%s%s releasing %s%s%s%s%s\n",
	    addr, 
	    set_flags&BUF_DIRTY ? "DIRTY " : "", 
	    set_flags&BUF_VALID ? "VALID " : "", 
	    set_flags&BUF_PINNED ? "PINNED " : "", 
	    set_flags&BUF_IO_ERROR ? "IO_ERROR " : "", 
	    set_flags&BUF_IO_IN_PROGRESS ? "IO_IN_PROG " : "", 
	    set_flags&BUF_NEWPAGE ? "NEWPAGE " : "", 
	    unset_flags&BUF_DIRTY ? "DIRTY " : "", 
	    unset_flags&BUF_VALID ? "VALID " : "", 
	    unset_flags&BUF_PINNED ? "PINNED " : "", 
	    unset_flags&BUF_IO_ERROR ? "IO_ERROR " : "", 
	    unset_flags&BUF_IO_IN_PROGRESS ? "IO_IN_PROG " : "", 
	    unset_flags&BUF_NEWPAGE ? "NEWPAGE " : "" );
#endif
    if (!ADDR_OK(addr)) {
	error_log1 ( "buf_pin: Invalid Buffer Address %x\n", addr );
	return(1);
    }
    bufid = ((BUF_T *)addr) - buf_table;
    assert ( bufid < NUM_BUFS);
    bhp = &bufhdr_table[bufid];
    if (GET_MASTER) {
	return(1);
    }
    bhp->flags |= set_flags;
    if ( set_flags & BUF_PINNED ) {
	bhp->refcount++;
    }
    if ( set_flags & BUF_DIRTY ) {
	unset_flags |= BUF_NEWPAGE;
    }

    if ( unset_flags & BUF_PINNED ) {
	bhp->refcount--;
	if ( bhp->refcount ) {
		/* Turn off pin bit so it doesn't get unset */
		unset_flags &= ~BUF_PINNED;
	}
    }
    bhp->flags &= ~unset_flags;
    MAKE_MRU(bhp);
    if (RELEASE_MASTER) {
	return(1);
    }
    return(0);
}

/*
	Take a string name and produce an fid.

	returns -1 on error

	MIS -- this is a potential problem -- you keep actual names
		here -- what if people run from different directories?
*/
extern	int
buf_name_lookup ( fname )
char	*fname;
{
    int	i;
    int	fid;
    int	ndx;

    fid = -1;
    if (GET_MASTER) {
	    return(-1);
    }
    for ( i = 0; i < NUM_FILE_ENTRIES; i++ ) {
	if ( buf_fids[i].offset == -1 ) {
		fid = i;
	} else {
		if (!strcmp (fname, buf_strings+buf_fids[i].offset)) {
			if (RELEASE_MASTER) {
				return(-1);
			}
			buf_fids[i].refcount++;
			return(i);
		}
	}
    }
    if ( fid == -1 ) {
	error_log0 ( "No more file ID's\n" );
    } else {
	ndx = *buf_sp - strlen(fname) - 1;
	if ( ndx < 0 ) {
	    error_log0 ( "Out of string space\n" );
	    fid = -1;
	} else {
	    *buf_sp = ndx;
	    strcpy ( buf_strings+ndx, fname );
	    buf_fids[fid].offset = ndx;
	}
	buf_fids[fid].refcount = 1;
    }
    if (RELEASE_MASTER) {
	return(-1);
    }
    return(fid);
}

static	int
bf_fid_to_fd ( fid ) 
int	fid;
{
	struct stat sbuf;

	assert ( (fid < NUM_FILE_ENTRIES) && (buf_fids[fid].offset != -1) );
	if ( fds[fid] != -1 ) {
	    return(fds[fid]);

	}
	fds[fid] = open ( buf_strings+buf_fids[fid].offset, O_RDWR|O_CREAT, 
			  0666 );
	if ( fds[fid] < 0 ) {
	    error_log3 ( "Error Opening File %s FID: %d FD: %d.  Errno = %d\n", 
			    buf_strings+buf_fids[fid].offset, fid, fds[fid], 
			    errno );
	    return(-1);
	}
	error_log3 ( "Opening File %s FID: %d FD: %d\n", 
			buf_strings+buf_fids[fid].offset, fid, fds[fid] );
	if ( buf_fids[fid].npages == -1 ) {
		/* Initialize the npages field */
		if ( fstat ( fds[fid], &sbuf ) ) {
		    error_log3 ( "Error Fstating %s FID: %d.  Errno = %d\n", 
				buf_strings+buf_fids[fid].offset, fid, errno );
		} else {
		    buf_fids[fid].npages = ( sbuf.st_size / BUFSIZE );
		}
	}

	return ( fds[fid] );
}

static int
bf_put_page ( fd, bhp ) 
int	fd;
BUFHDR_T	*bhp;
{
	int	nbytes;

	assert ( (bhp-bufhdr_table) < NUM_BUFS );
	if ( lseek ( fd, bhp->id.obj_id << BUFSHIFT, SEEK_SET ) < 0 ) {
	    return(-1);
	}
	bhp->flags |= BUF_IO_IN_PROGRESS;
	if (RELEASE_MASTER) {
	    return(-1);
	}
	nbytes = write(fd, buf_table[bhp-bufhdr_table], BUFSIZE);
	if (GET_MASTER) {
	    return(-2);
	}
	if ( nbytes < 0 ) {
		error_log1 ("Write failed with error code %d\n", errno);
		return(-1);
	} else if ( nbytes != BUFSIZE ) {
		error_log1 ("Short write: %d bytes of %d\n", nbytes, BUFSIZE );
	}
	bhp->flags &= ~(BUF_DIRTY|BUF_IO_IN_PROGRESS);
	return (0);
}

static BUFHDR_T	*
bf_assign_buf ( ndx, obj, flags, len )
int	ndx;
OBJ_T	*obj;
u_long	flags;
int	*len;		/* Number of bytes read */
{
    BUFHDR_T	*bhp;
    int		fd;

    assert ( obj->file_id < NUM_FILE_ENTRIES );
    bhp = bf_newbuf();
    if ( !bhp ) {
	return(NULL);
    }
    OBJ_ASSIGN ( (*obj), bhp->id );
    if ( buf_hash_table[ndx] >= NUM_BUFS ) {
	buf_hash_table[ndx] = bhp-bufhdr_table;
    } else {
	LISTPE_INSERT ( bufhdr_table, hash, bhp, buf_hash_table[ndx] );
    }

    bhp->flags |= BUF_VALID;
    if ( flags & BF_PIN ) {
	bhp->flags |= BUF_PINNED;
	bhp->refcount++;
#ifdef PIN_DEBUG
	fprintf(stderr, "bf_assign_buf: %X PINNED (%d)\n", 
		buf_table + (bhp-bufhdr_table), bhp->refcount);
#endif
    }
    fd = bf_fid_to_fd(obj->file_id);
    if ( fd == -1 ) {
	error_log1 ("Invalid fid %d\n", obj->file_id);
	bhp->flags |= ~BUF_IO_ERROR;
	return(NULL);
    }
    if ( obj->obj_id >= buf_fids[obj->file_id].npages) {
	buf_fids[obj->file_id].npages = obj->obj_id+1;
	*len = 0;
    } else if ( flags & BF_EMPTY ) {
	*len = 0;
    } else {
	bhp->flags |= BUF_IO_IN_PROGRESS;
	if (RELEASE_MASTER) {
		return(NULL);
	}
	if ( lseek ( fd, obj->obj_id << BUFSHIFT, SEEK_SET ) < -1 ) {
	    error_log2 ("Unable to perform seek on file: %d  to page %d",
			obj->file_id, obj->obj_id );
	    bhp->flags &= ~BUF_IO_IN_PROGRESS;
	    bhp->flags |= ~BUF_IO_ERROR;
	    return(NULL);
	}
	*len = read(fd, buf_table[bhp-bufhdr_table], BUFSIZE);
	if ( *len < 0 ) {
	    error_log2 ("Unable to perform read on file: %d  to page %d",
			obj->file_id, obj->obj_id );
	    bhp->flags &= ~BUF_IO_IN_PROGRESS;
	    bhp->flags |= ~BUF_IO_ERROR;
	    return(NULL);
	} 
	if (GET_MASTER) {
		return(NULL);
	}
	bhp->flags &= ~BUF_IO_IN_PROGRESS;
	if ( bhp->wait_proc != -1 ) {
	    /* wake up waiter and anyone waiting on it */
#ifdef DEBUG
	    printf("Waking transaction %d due to completed I/O\n", 
		bhp->wait_proc);
#endif
	    proc_wake_id ( bhp->wait_proc );
	    bhp->wait_proc = -1;
	}
	MAKE_MRU(bhp);
    }

    if ( flags & BF_DIRTY ) {
	bhp->flags |= BUF_DIRTY;
    } else if ( *len < BUFSIZE ) {
	bhp->flags |= BUF_NEWPAGE;
    }
    return ( bhp );
}

int
buf_last ( fid )
int	fid;
{
	int	val;

	if (GET_MASTER) {
		return(-1);
	}
	assert ( fid < NUM_FILE_ENTRIES );
	if ( buf_fids[fid].npages == -1 ) {
	    /* initialize npages field */
	    (void) bf_fid_to_fd ( fid );
	}
	val = buf_fids[fid].npages;	
	if ( val ) {
		val--;			/* Convert to page number */
	}
	if (RELEASE_MASTER) {
		return(-1);
	}
	return(val);
}

#ifdef DEBUG
extern void
buf_dump ( id, all )
int	id;
int	all;
{
	int i;
	BUFHDR_T	*bhp;

	printf ( "LRU + %d\n", *buf_lru );
	if ( all ) {
	    printf("ID\tFID\tPID\tLNEXT\tLPREV\tHNEXT\tHPREV\tSLEEP\tFLAG\tREFS\n");
	    for ( bhp = bufhdr_table, i = 0; i < NUM_BUFS; bhp++, i++ ) {
		    printf ( "%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%x\t%d\n", i,
			bhp->id.file_id, bhp->id.obj_id,
			bhp->lru.next, bhp->lru.prev,
			bhp->hash.next, bhp->hash.prev,
			bhp->wait_proc, bhp->flags, bhp->refcount );
	    }
	} else {
		if ( id >= NUM_BUFS ) {
			printf ( "Buffer ID (%d) too high\n", id );
			return;
		}
		bhp = bufhdr_table+id;
		printf ( "%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%x\t%d\n", i,
		    bhp->id.file_id, bhp->id.obj_id,
		    bhp->lru.next, bhp->lru.prev,
		    bhp->hash.next, bhp->hash.prev,
		    bhp->wait_proc, bhp->flags, bhp->refcount );
	}
	return;
}
#endif

